---
title: "Daegu Risk Zones"
format:
html:
theme: cosmo
toc: true
toc-depth: 2
code-fold: true
code-tools: true
execute:
echo: true
warning: false
message: false
freeze: auto
jupyter: python3
lang: ko
---
## 1) 코드
### 데이터 확인 1: 결측치 전체 확인
```{python}
import pandas as pd
import numpy as np
df_building_original = pd.read_csv("./Data/건축물대장_통합.csv")
# 주요 컬럼별 결측치 개수 확인
print("대지위치 결측:", df_building_original["대지위치"].isna().sum())
print("지상층수 결측:", df_building_original["지상층수"].isna().sum())
print("높이(m) 결측:", df_building_original["높이(m)"].isna().sum())
print("구조코드명 결측:", df_building_original["구조코드명"].isna().sum())
print("기타구조 결측:", df_building_original["기타구조"].isna().sum())
print("사용승인년도 결측:", df_building_original["사용승인년도"].isna().sum())
print("위도 결측:", df_building_original["위도"].isna().sum())
print("경도 결측:", df_building_original["경도"].isna().sum())
print("위도/경도 모두 결측:", (df_building_original["위도"].isna() & df_building_original["경도"].isna()).sum())
```
### 데이터 확인 2-1: 위경도 결측치 중 구조코드명별 퍼센트
```{python}
# 위경도 결측치 중 구조코드명별 퍼센트
df_building_filter = df_building_original.loc[df_building_original["위도"].isna() & df_building_original["경도"].isna(), :]
df_building_grouped = (
df_building_filter.groupby('구조코드명', dropna=False)
.size()
.rename('수')
.reset_index()
)
df_building_grouped['퍼센트'] = (df_building_grouped['수'] / len(df_building_filter) * 100).round(2)
df_building_grouped = df_building_grouped.sort_values(['수','구조코드명'], ascending=[False, True], ignore_index=True)
df_building_grouped
```
### 데이터 확인 2-2: 위경도 결측치 중 일반목구조 비율
```{python}
# 위경도 결측치 중 일반목구조가 전체 일반목구조에 어느정도 해당하는지
original_wooden_structure = ((df_building_original["구조코드명"] == "일반목구조") & (df_building_original["위도"].notna())).sum()
filter_wooden_structure = df_building_grouped.loc[df_building_grouped["구조코드명"] == "일반목구조", "수"].values[0]
(filter_wooden_structure / original_wooden_structure * 100).round(2)
```
### 위도 경도 컬럼 추가
```{python}
#| eval: false
import os
import time
import json
import requests
import pandas as pd
# 기본 설정
KAKAO_REST_KEY = "f939970b0ab002e6aa011535f5388344"
KAKAO_URL = "https://dapi.kakao.com/v2/local/search/address.json"
HEADERS = {"Authorization": f"KakaoAK {KAKAO_REST_KEY}"}
REQUEST_INTERVAL = 0.15
CACHE_PATH = "./kakao_geocode_cache.json"
# 캐시 로드/세이브
def load_cache(path=CACHE_PATH):
if os.path.exists(path):
try:
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
except:
return {}
return {}
def save_cache(cache, path=CACHE_PATH):
try:
with open(path, "w", encoding="utf-8") as f:
json.dump(cache, f, ensure_ascii=False)
except:
pass
cache = load_cache()
# 카카오 지오코딩 함수
def kakao_geocode(address):
if not isinstance(address, str) or not address.strip():
return None
addr = address.strip()
if addr in cache:
return cache[addr]
params = {"query": addr}
try:
r = requests.get(KAKAO_URL, headers=HEADERS, params=params, timeout=10)
if r.status_code == 200:
data = r.json()
docs = data.get("documents", [])
if docs:
d0 = docs[0]
lon, lat = float(d0["x"]), float(d0["y"])
result = {"lat": lat, "lon": lon}
else:
result = None
else:
result = None
except:
result = None
cache[addr] = result
save_cache(cache)
time.sleep(REQUEST_INTERVAL)
return result
# CSV 불러오기
df_building_original = pd.read_csv(
"../Raw Data/건축물대장/건축물대장_대구광역시_종합.csv",
sep=None, engine="python"
)
df_building_filter = df_building_original[[
"대지위치", "지상층수", "지하층수", "높이(m)", "구조코드명", "기타구조", "주용도코드명", "비상용승강기수"
]].copy()
df_building_filter["사용승인년도"] = df_building_original["사용승인일"].astype(str).str.slice(0, 4)
# 1만개씩 나누기
df_building_temp = df_building_filter.loc[:10000, :].copy()
# 좌표 변환 적용
df_building_temp["위도"] = None
df_building_temp["경도"] = None
for i, addr in enumerate(df_building_temp["대지위치"].dropna().unique(), 1):
geo = kakao_geocode(addr)
if geo:
df_building_temp.loc[df_building_temp["대지위치"] == addr, "위도"] = geo["lat"]
df_building_temp.loc[df_building_temp["대지위치"] == addr, "경도"] = geo["lon"]
if i % 500 == 0:
print(f"{i} / {len(df_building_temp['대지위치'].dropna().unique)} 처리 완료")
# 저장
df_building_temp.to_csv(
"../Raw Data/건축물대장_위도경도포함/1.csv",
index=False, encoding="utf-8-sig"
)
print("저장 완료")
# 파일 합치기
files = [
"../Raw Data/건축물대장_위도경도포함/건축물2_좌표.csv",
"../Raw Data/건축물대장_위도경도포함/건축물대장1_1.csv",
"../Raw Data/건축물대장_위도경도포함/건축물대장1_2.csv",
"../Raw Data/건축물대장_위도경도포함/건축물대장1_3.csv",
"../Raw Data/건축물대장_위도경도포함/건축물대장1_4.csv",
"../Raw Data/건축물대장_위도경도포함/건축물대장1_5.csv",
"../Raw Data/건축물대장_위도경도포함/건축물대장1_6.csv",
"../Raw Data/건축물대장_위도경도포함/대구_건축물대장_2(6~80000).csv",
"../Raw Data/건축물대장_위도경도포함/대구_건축물대장_all.csv",
"../Raw Data/건축물대장_위도경도포함/건축물대장(30000~49999).csv"
]
# 뽑을 컬럼 목록
columns_to_keep = [
"대지위치", "지상층수", "지하층수", "높이(m)", "구조코드명", "기타구조",
"주용도코드명", "비상용승강기수", "사용승인년도", "위도", "경도"
]
dfs = []
for file in files:
df = pd.read_csv(file, encoding="utf-8")
existing_cols = [col for col in columns_to_keep if col in df.columns]
df = df[existing_cols]
dfs.append(df)
merged_df = pd.concat(dfs, ignore_index=True)
# 저장
merged_df.to_csv("./Data/건축물대장_통합.csv", index=False, encoding="utf-8-sig")
```
### 소화전/소방서 최소거리 컬럼 추가
```{python}
#| eval: false
import pandas as pd
import numpy as np
# 데이터 불러오기
df_building = pd.read_csv('./Data/건축물대장_통합.csv')
df_hydrant = pd.read_csv('./Data/대구_소방장치_위치.csv')
df_firestation = pd.read_csv('./Data/대구_소방서_위치.csv')
# 거리계산 함수 정의
def haversine_min_distance(lat1, lon1, hy_lats, hy_lons):
R = 6371000 # 지구 반지름 (m)
lat1 = np.radians(lat1)
lon1 = np.radians(lon1)
dlat = hy_lats - lat1
dlon = hy_lons - lon1
a = np.sin(dlat / 2)**2 + np.cos(lat1) * np.cos(hy_lats) * np.sin(dlon / 2)**2
c = 2 * np.arcsin(np.sqrt(a))
distances = R * c
return distances.min()
# 위경도 라디안화
hydrant_lats = np.radians(df_hydrant["위도"].values)
hydrant_lons = np.radians(df_hydrant["경도"].values)
station_lats = np.radians(df_firestation["위도"].values)
station_lons = np.radians(df_firestation["경도"].values)
# 소화전거리 계산 및 추가
df_building['소화전거리'] = df_building.apply(
lambda row: haversine_min_distance(row["위도"], row["경도"], hydrant_lats, hydrant_lons),
axis=1
)
# 소방서거리 계산 및 추가
df_building["소방서거리"] = df_building.apply(
lambda row: haversine_min_distance(row["위도"], row["경도"], station_lats, station_lons),
axis=1
)
# 결과 저장
df_building.to_csv('./Data/건축물대장_소화전_소방서거리.csv', index=False)
```
### 건축물 종합 점수 산정
```{python}
#| eval: false
from datetime import date
import math, re
import numpy as np
import pandas as pd
def _parse_year(value):
if value is None:
return None
if isinstance(value, float) and math.isnan(value):
return None
if isinstance(value, int):
return value
if isinstance(value, float):
return int(value)
s = str(value).strip()
if s == "":
return None
s_num = s.replace(",", "")
if re.fullmatch(r"[+-]?\d+(\.\d+)?", s_num):
try:
return int(float(s_num))
except Exception:
return None
m = re.search(r"(\d{4})", s)
if m:
return int(m.group(1))
return None
def aging_score(value):
year = _parse_year(value)
if year is None:
return 0
current_year = date.today().year
age = current_year - year
if age < 0 or year < 1800:
return 0
if age >= 40:
return 5
elif age >= 30:
return 4
elif age >= 20:
return 3
elif age >= 10:
return 2
elif age >= 0:
return 1
else:
return 0
def _parse_floor_count(value):
if value is None:
return None
if isinstance(value, float) and math.isnan(value):
return None
if isinstance(value, (int, float)):
n = int(float(value))
return n if n >= 0 else None
s = str(value).strip()
if s == "":
return None
s_num = s.replace(",", "")
if re.fullmatch(r"[+-]?\d+(\.\d+)?", s_num):
try:
n = int(float(s_num))
return n if n >= 0 else None
except Exception:
return None
m = re.search(r"(-?\d+)", s)
if m:
n = int(m.group(1))
return n if n >= 0 else None
return None
def aboveground_floors_score(value):
floors = _parse_floor_count(value)
if floors is None:
return 0
if floors >= 30:
return 5
elif floors >= 20:
return 4
elif floors >= 10:
return 3
elif floors >= 5:
return 2
elif floors >= 1:
return 1
else:
return 0
def _parse_basement_floor_count(value):
if value is None:
return None
if isinstance(value, float) and math.isnan(value):
return None
if isinstance(value, (int, float)):
try:
n = int(float(value))
return abs(n) if n != 0 else 0
except Exception:
return None
s = str(value).strip()
if s == "":
return None
m = re.fullmatch(r"[Bb]\s*(\d+)\s*[Ff]?", s)
if m:
return int(m.group(1))
m = re.search(r"지하\s*(\d+)", s)
if m:
return int(m.group(1))
m = re.search(r"(-?\d+)", s.replace(",", ""))
if m:
try:
return abs(int(m.group(1)))
except Exception:
return None
return None
def basement_floors_score(value):
floors = _parse_basement_floor_count(value)
if floors is None:
return 0
if floors >= 3:
return 3
elif floors >= 2:
return 2
elif floors >= 1:
return 1
else:
return 0
MAIN_USE_SCORE_MAP = {
'숙박시설': 9.0,
'야영장시설': 9.0,
'관광휴게시설': 9.0,
'공장': 8.0,
'창고시설': 8.0,
'노유자시설': 7.0,
'교육연구시설': 7.0,
'교육연구및복지시설': 7.0,
'의료시설': 7.0,
'수련시설': 7.0,
'제2종근린생활시설': 5.0,
'근린생활시설': 5.0,
'제1종근린생활시설': 5.0,
'종교시설': 5.0,
'문화및집회시설': 5.0,
'운동시설': 5.0,
'업무시설': 5.0,
'판매시설': 5.0,
'위락시설': 5.0,
'판매및영업시설': 5.0,
'기타제1종근린생활시설': 5.0,
'생활편익시설': 5.0,
'소매점': 5.0,
'동물및식물관련시설': 4.0,
'위험물저장및처리시설': 4.0,
'자원순환관련시설': 4.0,
'분뇨.쓰레기처리시설': 4.0,
'방송통신시설': 4.0,
'자동차관련시설': 4.0,
'장례시설': 4.0,
'운수시설': 4.0,
'교정및군사시설': 4.0,
'국방,군사시설': 4.0,
'발전시설': 4.0,
'묘지관련시설': 4.0,
'단독주택': 2.0,
'공동주택': 2.0,
'다가구주택': 2.0,
'공공용시설': 1.0,
}
def main_use_score_exact(value) -> float:
if value is None:
return 0.0
if isinstance(value, float) and math.isnan(value):
return 0.0
s = str(value).strip()
if s == "" or s.lower() == "nan":
return 0.0
return MAIN_USE_SCORE_MAP.get(s, 0.0)
STRUCTURE_SCORE_MAP = {
'철근콘크리트구조': 0.0,
'콘크리트구조': 0.0,
'프리케스트콘크리트구조': 0.0,
'보강콘크리트조': 0.0,
'기타콘크리트구조': 0.0,
'라멘조': 0.0,
'일반철골구조': 2.0,
'경량철골구조': 2.0,
'강파이프구조': 2.0,
'철파이프조': 2.0,
'기타강구조': 2.0,
'스틸하우스조': 2.0,
'단일형강구조': 2.0,
'철골구조': 2.0,
'공업화박판강구조(PEB)': 2.0,
'트러스구조': 2.0,
'철골콘크리트구조': 2.0,
'철골철근콘크리트구조': 2.0,
'철골철근콘크리트합성구조': 2.0,
'기타철골철근콘크리트구조': 2.0,
'일반목구조': 5.0,
'목구조': 5.0,
'통나무구조': 5.0,
'트러스목구조': 5.0,
'벽돌구조': 4.0,
'블록구조': 4.0,
'시멘트블럭조': 4.0,
'조적구조': 4.0,
'기타조적구조': 4.0,
'석구조': 4.0,
'흙벽돌조': 4.0,
'조립식판넬조': 3.0,
'컨테이너조': 3.0,
'막구조': 1.0,
'기타구조': 1.0,
}
def structure_score(value) -> float:
if value is None:
return 0.0
if isinstance(value, float) and math.isnan(value):
return 0.0
s = str(value).strip()
if not s or s.lower() == "nan":
return 0.0
if s in STRUCTURE_SCORE_MAP:
return STRUCTURE_SCORE_MAP[s]
if ('목' in s) or ('통나무' in s):
return 5.0
if ('조적' in s) or ('벽돌' in s) or ('블록' in s) or ('석' in s):
return 4.0
if ('조립' in s) or ('판넬' in s) or ('컨테이너' in s):
return 3.0
if ('철골' in s) or ('강구조' in s) or ('스틸' in s) or ('파이프' in s):
return 2.0
if ('막' in s) or ('특수' in s):
return 1.0
if ('콘크리트' in s) or ('라멘' in s):
return 0.0
return 0.0
def _parse_nonneg_int_count(value):
if value is None:
return None
if isinstance(value, float) and math.isnan(value):
return None
if isinstance(value, (int, float)):
n = int(float(value))
return n if n >= 0 else None
s = str(value).strip()
if s == "" or s.lower() == "nan":
return None
m = re.search(r"(\d+)", s.replace(",", ""))
if m:
n = int(m.group(1))
return n if n >= 0 else None
return None
def emergency_elevator_score(value) -> float:
n = _parse_nonneg_int_count(value)
if n is None:
return 0.0
if n == 0:
return 5.0
elif n == 1:
return 4.0
elif n == 2:
return 3.0
elif n == 3:
return 2.0
elif n == 4:
return 1.0
else:
return 0.0
def firestation_distance_score(dist_m, cap_over_max=True, invalid_to_nan=True):
arr = np.asarray(dist_m, dtype=float)
if invalid_to_nan:
arr = np.where(arr < 0, np.nan, arr)
default_val = 5.0 if cap_over_max else np.nan
scores = np.select(
[arr < 1000, arr < 3000, arr < 5000, arr < 7000, arr < 9000],
[1.0, 2.0, 3.0, 4.0, 5.0],
default=default_val
)
if np.isscalar(dist_m):
return float(scores.item())
if isinstance(dist_m, pd.Series):
return pd.Series(scores, index=dist_m.index, name=getattr(dist_m, "name", None))
return scores
def hydrant_distance_score(dist_m, cap_over_max=True, invalid_to_nan=True):
arr = np.asarray(dist_m, dtype=float)
if invalid_to_nan:
arr = np.where(arr < 0, np.nan, arr)
default_val = 5.0 if cap_over_max else np.nan
scores = np.select(
[arr <= 30, arr <= 60, arr <= 90, arr <= 120, arr <= 150],
[1.0, 2.0, 3.0, 4.0, 5.0],
default=default_val
)
scores = np.where(np.isnan(arr), np.nan, scores)
if np.isscalar(dist_m):
return float(np.asarray(scores).item())
if isinstance(dist_m, pd.Series):
return pd.Series(scores, index=dist_m.index, name=getattr(dist_m, "name", None))
return scores
# 점수 산정
df = pd.read_csv("./Data/건축물대장_소화전_소방서거리.csv")
df["건물노후도점수"] = df["사용승인년도"].apply(aging_score)
df["지상층수점수"] = df["지상층수"].apply(aboveground_floors_score)
df["지하층수점수"] = df["지하층수"].apply(basement_floors_score)
df["주용도점수"] = df["주용도코드명"].apply(main_use_score_exact)
df["구조점수"] = df["구조코드명"].apply(structure_score)
df["비상용승강기점수"] = df["비상용승강기수"].apply(emergency_elevator_score)
df["소방서거리점수"] = df["소방서거리"].apply(firestation_distance_score)
df["소화전거리점수"] = df["소화전거리"].apply(hydrant_distance_score)
df["종합점수"] = df["건물노후도점수"] + df["지상층수점수"] + df["지하층수점수"] + df["주용도점수"] + df["구조점수"] + df["비상용승강기점수"] + df["소방서거리점수"] + df["소화전거리점수"]
df.to_csv("./Data/건축물대장_통합_점수.csv")
```
## 2) 시각화
### 119안전센터 및 소화장치 위치 시각화
```{python}
# 대구광역시 119안전센터 및 소화장치 위치 시각화
# 데이터 출처
# 대구광역시_소방 긴급구조 비상 소화장치 현황
# https://www.data.go.kr/data/15117284/fileData.do
# 소방청_119안전센터 현황
# https://www.data.go.kr/data/15065056/fileData.do
import pandas as pd
import numpy as np
import plotly.express as px
import plotly.graph_objects as go
loc_119 = pd.read_csv("./Data/대구_소방서_위치.csv")
loc_fire = pd.read_csv("./Data/대구_소방장치_위치.csv")
# 대구광역시 구별 소방 안전센터 시각화
import json
with open ("./Data/시각화/대구_시군구_군위포함/대구_시군구_군위포함.geojson", encoding='utf-8') as f:
geojson_data = json.load(f)
# print(geojson_data.keys())
# 구별 소방 안전센터 scatter_mapbox
fig = px.scatter_mapbox(
loc_119, lat="위도", lon="경도", color="구이름",
hover_name="119안전센터명",
hover_data={"위도": False, "경도": False, "구이름": True, "동이름": True},
zoom=11,
height=650,
)
fig.update_traces(marker=dict(size=15))
# 구별 소방 긴급구조 비상 소화장치 scatter mapbox
fig.add_trace(go.Scattermapbox(
lat=loc_fire["위도"],
lon=loc_fire["경도"],
mode="markers",
marker=go.scattermapbox.Marker(size=5, color="blue"),
name="소화장치",
hovertemplate="<b>구:</b> %{customdata[0]}<br><b>동:</b> %{customdata[1]}<extra></extra>",
customdata=loc_fire[["구이름", "동이름"]].values,
))
fig.update_layout(
mapbox_style="carto-positron",
mapbox_layers=[
{
"sourcetype": "geojson",
"source": geojson_data,
"type": "line",
"color": "green",
"line": {"width": 1},
}
],
mapbox_center={"lat": 35.8714, "lon": 128.6014},
margin={"r":0, "t":30, "l":0, "b":0},
)
fig.show()
```
### 소방서, 소화전 거리 분포 시각화
```{python}
# %% 라이브러리 호출
import pandas as pd
import numpy as np
import plotly.express as px
# %% 데이터 로드
df = pd.read_csv('./Data/건축물대장_통합_점수.csv')
hyd = pd.read_csv('./Raw Data/대구광역시_소화장치_위치데이터.csv')
#firestn = pd.read_csv('대구광역시_소방서_위치데이터.csv', encoding='cp949')
# %%
hydrant_lats = np.radians(hyd["위도"].values)
hydrant_lons = np.radians(hyd["경도"].values)
# %% 거리계산 함수 정의
def haversine_min_distance(lat1, lon1, hy_lats, hy_lons):
R = 6371000 # 지구 반지름 (m)
lat1 = np.radians(lat1)
lon1 = np.radians(lon1)
dlat = hy_lats - lat1
dlon = hy_lons - lon1
a = np.sin(dlat / 2)**2 + np.cos(lat1) * np.cos(hy_lats) * np.sin(dlon / 2)**2
c = 2 * np.arcsin(np.sqrt(a))
distances = R * c
return distances.min()
# %% min({소화전거리(m)})
df['소화전거리'] = df.apply(
lambda row: haversine_min_distance(row["위도"], row["경도"], hydrant_lats, hydrant_lons),
axis=1
)
# %%
df['소화전거리'].head()
# %% 소방서 데이터
firestation = pd.read_csv('./Data/대구_소방서_위치.csv')
firestation.head()
# %% min({소방서거리(m)})
station_lats = np.radians(firestation["위도"].values)
station_lons = np.radians(firestation["경도"].values)
df["소방서거리"] = df.apply(
lambda row: haversine_min_distance(row["위도"], row["경도"], station_lats, station_lons),
axis=1
)
# %% 소방서거리, 소화전거리 분포 시각화
# 소방서거리 분포
fig1 = px.histogram(df, x="소방서거리", nbins=100, title="가장 가까운 소방서 거리 분포", marginal="box")
fig1.update_layout(
bargap=0.1,
xaxis_title="거리(m)",
yaxis_title="건물 수",
template='plotly_white'
)
fig1.show()
# 소화전거리 분포
fig2 = px.histogram(df, x="소화전거리", nbins=100, title="가장 가까운 소화전 거리 분포", marginal="box")
fig2.update_layout(
bargap=0.1,
xaxis_title="거리(m)",
yaxis_title="건물 수",
template='plotly_white'
)
fig2.show()
```
### 노령 인구 시각화
```{python}
#======================================
# 노령 인구 비율 시각화
#======================================
# 동별 노령인구 비율 시각화
import pandas as pd
import numpy as np
import plotly.express as px
import plotly.graph_objects as go
df = pd.read_csv("./Data/동별인구.csv")
new = df[['군·구', '동·읍·면', '고령자_비율','위도','경도']]
# 동별 고령자 비율 값
g2_by_dong = new.groupby(['동·읍·면'])[['고령자_비율']].sum()
g2_by_dong = g2_by_dong.sort_values(by='고령자_비율',ascending=False)
g2_by_dong.rename(columns={'고령자_비율': '고령자_평균비율'}, inplace=True)
g2_by_dong = g2_by_dong.reset_index()
# g2_by_dong.info()
import geopandas as gpd
gdf = gpd.read_file("./Data/시각화/대구_행정동/대구_행정동_군위포함.shp")
print(gdf.crs)
gdf = gdf.to_crs(epsg=4326)
# gdf.to_file("./Data/대구_행정동_군위포함.geojson", driver="GeoJSON")
import json
with open("./Data/시각화/대구_행정동/대구_행정동_군위포함.geojson", encoding='utf-8') as f:
geojson_data = json.load(f)
# print(geojson_data.keys())
# print(geojson_data['features'][0]['properties'])
# gdf 파일에 유천동이 없고 g2_by_dong 파일에 유천동이 있어 행 삭제
cond = (gdf['ADM_DR_CD'] == '유천동')
gdf[cond]
g2_by_dong.rename(columns={'동·읍·면': 'ADM_DR_NM'}, inplace=True)
cond = (g2_by_dong['ADM_DR_NM'] == '유천동')
g2_by_dong = g2_by_dong.drop(g2_by_dong[cond].index)
# 불로봉무동 이름 변경
g2_by_dong.loc[g2_by_dong['ADM_DR_NM'] == '불로봉무동', 'ADM_DR_NM'] = '불로·봉무동'
# 동별 노령인구 비율 시각화
fig = px.choropleth_mapbox(g2_by_dong,
geojson=geojson_data,
locations="ADM_DR_NM",
featureidkey="properties.ADM_DR_NM",
color="고령자_평균비율",
color_continuous_scale="Greens",
mapbox_style="carto-positron",
center={"lat":35.87702415809577, "lon":128.58970500739858},
zoom=10,
opacity=0.7,
title="대구광역시 동별 노인평균인구비율"
)
fig.update_layout(margin={"r":0,"t":30,"l":0,"b":0})
fig.show()
# ===================================
# 구별 고령자 비율 평균
g1_by_gu = new.groupby(['군·구'])[['고령자_비율']].mean()
g1_by_gu = g1_by_gu.reset_index()
g1_by_gu = g1_by_gu.sort_values(by='고령자_비율',ascending=False)
g1_by_gu.rename(columns={'군·구': 'SIGUNGU_NM', '고령자_비율': '고령자_평균비율',}, inplace=True)
import geopandas as gpd
gdf2 = gpd.read_file("./Data/시각화/대구_시군구_군위포함/대구광역시_시군구_군위포함.shp")
print(gdf2.crs)
gdf2 = gdf2.to_crs(epsg=4326)
# gdf2.to_file("./Data/대구_시군구_군위포함.geojson", driver="GeoJSON")
import json
with open("./Data/시각화/대구_시군구_군위포함/대구_시군구_군위포함.geojson", encoding='utf-8') as f:
geojson_data2 = json.load(f)
print(geojson_data2.keys())
print(geojson_data2['features'][0]['properties'])
# 구별 노령 인구 비율 시각화
fig = px.choropleth_mapbox(g1_by_gu,
geojson=geojson_data2,
locations="SIGUNGU_NM",
featureidkey="properties.SIGUNGU_NM",
color="고령자_평균비율",
color_continuous_scale="Greens",
mapbox_style="carto-positron",
center={"lat":35.87702415809577, "lon":128.58970500739858},
zoom=10,
opacity=0.7,
title="대구광역시 구별 노인평균인구비율"
)
fig.update_layout(margin={"r":0,"t":30,"l":0,"b":0})
fig.show()
```
### 건축물대장 시각화
```{python}
# %% 라이브러리 호출
import numpy as np
import pandas as pd
import plotly.graph_objects as go
import plotly.express as px
# %% check
# columns_to_check = ['Column14', 'Column15', 'Column60', 'Column61', 'Column67']
# %% 구/군 별 데이터 로드
df1 = pd.read_csv('./Raw Data/건축물대장/건축물대장_대구광역시_군위군.csv')
df2 = pd.read_csv('./Raw Data/건축물대장/건축물대장_대구광역시_남구.csv')
df3 = pd.read_csv('./Raw Data/건축물대장/건축물대장_대구광역시_달서구.csv')
df4 = pd.read_csv('./Raw Data/건축물대장/건축물대장_대구광역시_달성군.csv')
df5 = pd.read_csv('./Raw Data/건축물대장/건축물대장_대구광역시_동구.csv')
df6 = pd.read_csv('./Raw Data/건축물대장/건축물대장_대구광역시_북구.csv')
df7 = pd.read_csv('./Raw Data/건축물대장/건축물대장_대구광역시_서구.csv')
df8 = pd.read_csv('./Raw Data/건축물대장/건축물대장_대구광역시_수성구.csv')
df9 = pd.read_csv('./Raw Data/건축물대장/건축물대장_대구광역시_중구.csv')
# %% 구/군 컬럼 추가
df1['군/구'] = '군위군'
df2['군/구'] = '남구'
df3['군/구'] = '달서구'
df4['군/구'] = '달성군'
df5['군/구'] = '동구'
df6['군/구'] = '북구'
df7['군/구'] = '서구'
df8['군/구'] = '수성구'
df9['군/구'] = '중구'
# %% 구/군 별 데이터 통합
df_all = pd.concat([df1, df2, df3, df4, df5, df6, df7, df8, df9], ignore_index=True)
# %% 구조 분류 딕셔너리 정의
structure_map = {
'목조 계열': ['일반목구조', '목구조', '트러스목구조', '통나무구조'],
'조적식 구조': ['석구조', '벽돌구조', '블록구조', '시멘트블럭조', '흙벽돌조', '조적구조', '기타조적구조'],
'콘크리트 계열': ['철근콘크리트구조','콘크리트구조','프리케스트콘크리트구조','보강콘크리트조','기타콘크리트구조','라멘조'],
'철골 계열': ['일반철골구조','경량철골구조','강파이프구조','철파이프조','기타강구조','스틸하우스조','단일형강구조','철골구조','공업화박판강구조(PEB)','트러스구조',
'철골콘크리트구조','철골철근콘크리트구조','철골철근콘크리트합성구조','기타철골철근콘크리트구조'],
'조립식·판넬·기타': ['조립식판넬조', '컨테이너조'],
'기타 / 특수 구조': ['막구조', '기타구조']
}
# %% 구조 분류
def map_structure_type(name):
for group, items in structure_map.items():
if name in items:
return group
return '미분류'
df_all['구조그룹'] = df_all['구조코드명'].apply(map_structure_type)
# %% 건축 자재별 분포 시각화
structure_counts = df_all['구조그룹'].value_counts()
# 도넛차트 그리기
fig = go.Figure(data=[go.Pie(
labels=structure_counts.index,
values=structure_counts.values,
hole=0.4,
textinfo='percent+label',
hoverinfo='label+value+percent',
insidetextorientation='radial'
)])
fig.update_layout(
title_text='건축 자재별 건물 분포',
annotations=[dict(text='', x=0.5, y=0.5, font_size=18, showarrow=False)],
showlegend=True
)
fig.show()
# %% 주용도 분류 딕셔너리 정의
building_use = {
'숙박/다중이용시설': ['숙박시설', '야영장시설', '관광휴게시설'],
'공장/창고시설': ['공장','창고시설'],
'교육/복지/의료/수련': ['노유자시설', '교육연구시설', '교육연구및복지시설', '의료시설', '수련시설'],
'상업/판매/문화/업무/근린/생활편익':
['제2종근린생활시설',
'근린생활시설',
'제1종근린생활시설',
'종교시설',
'문화및집회시설',
'운동시설',
'업무시설',
'판매시설',
'위락시설',
'판매및영업시설',
'기타제1종근린생활시설',
'생활편익시설',
'소매점'],
'기반시설':
['동물및식물관련시설',
'위험물저장및처리시설',
'자원순환관련시설',
'분뇨.쓰레기처리시설',
'방송통신시설',
'자동차관련시설',
'장례시설',
'운수시설',
'교정및군사시설',
'국방,군사시설',
'발전시설',
'묘지관련시설'],
'주거':
['단독주택',
'공동주택',
'다가구주택'],
'행정/공공':
'공공용시설',
}
# %% 주용도 분류
def use_type(name):
if not isinstance(name, str):
return '미분류'
for group, items in building_use.items():
if name in items:
return group
return '미분류'
df_all['주용도그룹'] = df_all['주용도코드명'].apply(use_type)
# %% 용도별 분포 시각화
use_group_counts = df_all['주용도그룹'].value_counts()
use_group_ratio = use_group_counts / use_group_counts.sum()
# 2% 미만은 기타로 묶기
threshold = 0.02
labels = []
values = []
etc_total = 0
for label, ratio in use_group_ratio.items():
if ratio >= threshold:
labels.append(label)
values.append(use_group_counts[label])
else:
etc_total += use_group_counts[label]
# 기타 항목 추가
if etc_total > 0:
labels.append('기타')
values.append(etc_total)
# 도넛 차트 생성
fig1 = go.Figure(data=[go.Pie(
labels=labels,
values=values,
hole=0.3, # 도넛 중앙 구멍 작게 = 도넛 자체 크게
textinfo='percent+label',
hoverinfo='label+value+percent',
insidetextorientation='radial'
)])
# 레이아웃 조정
fig1.update_layout(
title_text='주용도그룹 분포 (2% 미만 기타로 통합)',
annotations=[dict(text='주용도', x=0.5, y=0.5, font_size=20, showarrow=False)],
showlegend=True,
height=600, # 높이 늘려서 크게 보기
width=700
)
fig1.show()
# %% 용도, 자재 교차 분석 시각화
cross_tab = pd.crosstab(df_all['주용도그룹'], df_all['구조그룹'])
fig2 = go.Figure()
for 구조 in cross_tab.columns:
fig2.add_trace(go.Bar(
x=cross_tab.index,
y=cross_tab[구조],
name=구조
))
# 레이아웃 설정
fig2.update_layout(
barmode='stack', # 스택형 막대
title='주용도그룹 vs 구조그룹 분포 (스택형 막대 그래프)',
xaxis_title='주용도그룹',
yaxis_title='건물 수',
legend_title='구조그룹',
template='plotly_white'
)
fig2.show()
# %% 비상용 승강기 수 분포 시각화
cond_elevator = df_all['지상층수'] >= 5
emergency = df_all[cond_elevator]
# 결측치 0으로 대치
emergency['비상용승강기수'] = emergency['비상용승강기수'].fillna(0).astype(int)
# 5개 이상은 '5개 이상'으로 범주화
def categorize_elevators(x):
return str(x) if x < 5 else '5개 이상'
emergency['비상용승강기_그룹'] = emergency['비상용승강기수'].apply(categorize_elevators)
# 그룹별 건물 수 집계
grouped = emergency['비상용승강기_그룹'].value_counts().sort_index().reset_index()
grouped.columns = ['비상용승강기수', '건물수']
# 파이차트 시각화 (파이 크기 크게 설정)
fig = px.pie(grouped,
names='비상용승강기수',
values='건물수',
title='지상 5층 이상 건물의 비상용 승강기 수 분포',
width=700, height=700, # 파이 크기 조절
color_discrete_sequence=px.colors.sequential.Magma)
# 퍼센트와 라벨 모두 표시
fig.update_traces(textinfo='percent+label',
textfont_size=16,
pull=[0.03]*len(grouped)) # 조각 약간 분리(선택)
fig.show()
# %% 사용승인일 이상값 탐색(보충 필요)
df_all['사용승인일_길이'] = df_all['사용승인일'].astype(str).str.len()
df_all['사용승인일_길이'].unique()
cond = df_all['사용승인일_길이'] == 9
df_all[cond]['사용승인일'].unique()
df_year = df_all.copy()
cond_y9 = df_year['사용승인일_길이'] == 9
df_year.loc[cond_y9, '사용승인일'] = '19' + df_year.loc[cond_y9, '사용승인일'].astype(str)
cond_y11 = df_year['사용승인일'] == '191979100.0'
df_year[cond_y11]
df_year.loc[cond_y11, '사용승인일'] = df_year.loc[cond_y11, '사용승인일'].str[2:]
df_year['사용승인일_길이'] = df_year['사용승인일'].astype(str).str.len()
cond_drop = df_year['사용승인일_길이'].isin([2, 3, 5])
df_year = df_year[~cond_drop]
df_year.loc[:, '사용승인일'] = df_year['사용승인일'].astype(str).str.strip()
# %% 사용승인일(년도) 추출
df_year.loc[:, '사용승인일(년도)'] = df_year['사용승인일'].astype(str).str[:4]
df_year['사용승인일(년도)'] = df_year['사용승인일(년도)'].astype(str).str.strip()
df_year['사용승인일(년도)'].replace('', pd.NA, inplace=True)
df_year['사용승인일(년도)'] = pd.to_numeric(df_year['사용승인일(년도)'], errors='coerce').astype('Int64')
# %% 승인연도 필터링, 연령 계산
cleaned_year = df_year.dropna(subset='사용승인일(년도)')
filltered_year = cleaned_year[cleaned_year['사용승인일(년도)'] >= 1800]
filltered_year['연령'] = 2025 - filltered_year['사용승인일(년도)']
# %% 건축물 연령 분포 시각화
bins = list(range(0, 101, 10)) + [float('inf')]
labels = [f"{i}~{i+10}년" for i in range(0, 100, 10)] + ["100년 이상"]
filltered_year['연령대'] = pd.cut(filltered_year['연령'], bins=bins, labels=labels, right=False)
# 연령대별 건물 수 집계
age_group_counts = filltered_year['연령대'].value_counts().sort_index()
# Plotly로 막대 그래프 시각화
fig5 = px.bar(
x=age_group_counts.index,
y=age_group_counts.values,
labels={'x': '연령대', 'y': '건물 수'},
title='노후화 구간별 건물 수 분포 (10년 단위)',
text=age_group_counts.values,
color=age_group_counts.values,
color_continuous_scale='Viridis'
)
fig5.update_layout(
xaxis_title="노후화 구간",
yaxis_title="건물 수",
uniformtext_minsize=8,
uniformtext_mode='hide',
bargap=0.3
)
fig5.show()
# %% 40년 이상은 한 범주로 처리한 것
bins = [0, 10, 20, 30, 40, float('inf')]
labels = ['0~10년', '10~20년', '20~30년', '30~40년', '40년 이상']
# 2. 구간화
filltered_year['연령대'] = pd.cut(filltered_year['연령'], bins=bins, labels=labels, right=False)
# 3. 집계
age_group_counts = filltered_year['연령대'].value_counts(sort=False)
# 4. 시각화
fig6 = px.bar(
x=age_group_counts.index,
y=age_group_counts.values,
labels={'x': '연령대', 'y': '건물 수'},
title='노후화 구간별 건물 수 분포 (40년 이상 묶음)',
text=age_group_counts.values,
color=age_group_counts.values,
color_continuous_scale='Viridis'
)
fig6.update_layout(
xaxis_title="노후화 구간",
yaxis_title="건물 수",
uniformtext_minsize=8,
uniformtext_mode='hide',
bargap=0.3
)
fig6.show()
# %% 용도, 노후화 교차
bins = [0, 10, 20, 30, 40, float('inf')]
labels = ['0~10년', '10~20년', '20~30년', '30~40년', '40년 이상']
filltered_year['연령대'] = pd.cut(filltered_year['연령'], bins=bins, labels=labels, right=False)
# 2. 교차표 생성: 주용도그룹 × 연령대
cross_tab = pd.crosstab(filltered_year['주용도그룹'], filltered_year['연령대'])
# 3. Plotly로 교차 막대그래프 (그룹별 스택)
fig7 = px.bar(
cross_tab,
x=cross_tab.index,
y=cross_tab.columns,
labels={'value': '건물 수', '주용도그룹': '주용도 그룹', '연령대': '연령대'},
title='주용도 그룹별 연령대별 건물 수',
barmode='stack' # 누적 막대
)
fig7.update_layout(
xaxis_title='주용도 그룹',
yaxis_title='건물 수',
legend_title='연령대',
bargap=0.2
)
fig7.show()
# %%
```
### 종합점수 분포
```{python}
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
df = pd.read_csv('./Data/건축물대장_동추가.csv')
plt.rcParams['font.family'] = 'Malgun Gothic'
plt.rcParams['axes.unicode_minus'] = False
# 종합점수 시각화
plt.figure(figsize=(10, 6))
sns.histplot(df["종합점수"], kde=False, bins=50, color="skyblue")
plt.title("종합점수 분포", fontsize=16)
plt.xlabel("종합점수", fontsize=12)
plt.ylabel("건물 수", fontsize=12)
plt.show()
```